package com.atguigu.flink.watermark;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/14
 */
public class Demo2_EventTimeWindow
{
    public static void main(String[] args) {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

         WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                     .<WaterSensor>forMonotonousTimestamps()
                     .withTimestampAssigner( (e, ts) -> e.getTs());
                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                    .assignTimestampsAndWatermarks(watermarkStrategy)
                   /*
                     基于事件时间的窗口计算。此时 水印就是时间
                        5s滚动的时间窗口:
                            第一个窗口: [0,5000) 等价于 [0,4999]。 水印到达4999就计算。
                            第二个窗口: [5000,9999]。  水印到达9999就计算。
                            ....
                    */
                    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .process(new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>()
                    {
                        @Override
                        public void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                            TimeWindow window = context.window();
                            out.collect(MyUtil.parseTimeWindow(window) +":" +MyUtil.parseToList(elements));
                        }
                    })
                    .print();


        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
